RxJava2是在JVM中實現Reactive programming的library,其特色是可以輕鬆的切換thread,並和LiveData一樣以observer pattern處理事件。
Android開發經常會用到非同步(asynchronous)機制,像是網路連線、資料庫存取和其他耗時的工作,此時必須用AsyncTask或Hadnler等方式實現thread切換並建立callback以得知事件何時完成,而RxJava可以大量簡化這些繁瑣的過程,只要一行程式就能在background和UI thread之間切換。
雖然RxJava的學習曲線稍高,但用順手之後可以對程式帶來顯著的改善,且RxJava不用事前準備就可以開始練習,所以個人覺得起手還比Dagger2簡單。幾個主流library包括Retrofit和Architecture Components都有額外支援RxJava,足見其在Android開發有一定的影響力。
直接看程式,引用Jake Wharton大神在演講中舉的例子:
interface UserManager {
User getUser();
void setName(String name);
void setAge(int age);
}
UserManager um = new UserManager();
System.out.println(um.getUser());
um.setName("Jane Doe");
System.out.println(um.getUser());
建立一簡單的UserManager讓我們取得User和更新Name、Age,底下印出User之後更新Name並再次印出。
那麼,如果這是一個耗時的工作,例如更新個人資料時需透過網路與伺服器同步,那就要建立callback以得知setName
何時完成,並在setName
完成時才顯示結果,讓程式變得reactive,
interface UserManager {
User getUser();
void setName(String name, Runnable callback);
void setAge(int age, Runnable callback);
}
UserManager um = new UserManager();
System.out.println(um.getUser());
um.setName("Jane Doe", new Runnable() {
@Override
public void run() {
System.out.println(um.getUser());
}
});
此外網路連線並不一定會成功,所以我們須知道連線是成功還是失敗。
UserManager um = new UserManager();
System.out.println(um.getUser());
um.setName("Jane Doe", new UserManager.Listener() {
@Override
public void success() {
System.out.println(um.getUser());
}
@Override
public void failure(IOException e) {
// TODO show the error...
}
});
最後,若連線成功時我們要更新UI上的使用者名稱,Android的網路連線只能發生在background thread,而更新UI則須回到UI thread,且在連線完成要更新UI時我們應檢查使用者是否還在Activity中,以免已經離開Activity而導致找不到UI元件的crash,於是就產生了這樣一隻小怪物:
public final class UserActivity extends Activity {
private final UserManager um = new UserManager();
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.user);
TextView tv = (TextView) findViewById(R.id.user_name);
tv.setText(um.getUser().toString());
um.setName("Jane Doe", new UserManager.Listener() {
@Override
public void success() {
runOnUiThread(new Runnable() {
@Override
public void run() {
if (!isDestroyed()) {
tv.setText(um.getUser().toString());
}
}
});
}
@Override
public void failure(IOException e) {
// TODO show the error...
}
});
}
}
只是發出網路連線並在完成時更新TextView而已,setName
的callback變得巢狀縮排好幾層,將來做更多功能時會變得更難以閱讀和維護。
那使用RxJava2會是怎樣呢?
一樣是讓setName
在background thread執行,並在UI thread進行callback的內容:
Completable.fromAction(() -> {um.setName("Jane Doe")})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new DisposableCompletableObserver() {
@Override
public void onComplete() {
tv.setText(um.getUser().toString());
}
@Override
public void onError(Throwable e) {
// TODO show the error...
}
});
fromAction
建立我們要執行的內容。subscribeOn
表示這些內容要在哪個thread執行,Schedulers.io()是RxJava提供的background thread之一,稍後解釋。observeOn
表示執行後的callback要在哪個thread執行。subscribe
時才會真的開始執行,就像LiveData的observe。
callback有onComplete()
和onError(Throwable e)
分別代表執行成功或失敗的處理。
其中最重要的是,藉由subscribeOn
和observeOn
我們就完成了thread切換:於io thread執行並在main thread更新結果,是不是很簡單?真的很簡單。
再看一次程式,由上到下每一行都清楚說明用意:
setName
雖然相對字數比較多,但程式解釋力比較高,尤其在將來情況越複雜時RxJava還是能保持一行一行沒有太多縮排的寫法,到時優勢會更明顯。
好,剛剛怕第一印象嚇到人其實做了一點手腳,上面的fromAction
那邊是用Android Studio把它縮起來或是用lambda語法的樣子,真正的樣子是這樣:
Completable.fromAction(new Action() {
@Override
public void run() throws Exception {
um.setName("Jane Doe");
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new DisposableCompletableObserver() {
@Override
public void onComplete() {
tv.setText(um.getUser().toString());
}
@Override
public void onError(Throwable e) {
// TODO show the error...
}
});
雖然把fromAction
縮起來比較好看但本系列會盡量少用lambda寫法,此例是剛好setName
沒有回傳值所以影響不大,後續的內容我覺得看懂參數輸入輸出會更好上手,當初自己學習時看別人的lambda語法都不知道為什麼能變那樣,所以,雖然醜了點但我們會把參數全寫出來。
上面還少了一部份,檢查Activity是否還存在的isDestroyed()
,除了在onComplete()
中檢查之外,RxJava還有更好的作法:Disposable。
public final class UserActivity extends Activity {
private CompositeDisposable disposables = new CompositeDisposable();
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
...
disposables.add(
Completable.fromAction(new Action() {
@Override
public void run() throws Exception {
um.setName("Jane Doe");
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableCompletableObserver() {
@Override
public void onComplete() {
tv.setText(um.getUser().toString());
}
@Override
public void onError(Throwable e) {
// TODO show the error...
}
}));
}
@Override
public void onDestroy() {
super.onDestroy();
disposables.clear();
}
}
建立一個CompositeDisposable並用add()
將Completable新增至其中,在onDestroy()
時呼叫clear()
就會清除disposables內的所有作業。這樣當多個連線運作時不用每個都檢查isDestroyed()
,只要都新增到disposables就可以在Destory時全部取消。
正式的介紹一下RxJava2,在這邊先說明幾個重要的東西,現在還不會全部用到但方便之後幾天查閱。
dependencie有兩個,一個是RxJava2本身,另一個是專為Android設計的RxAndroid,提供Android開發的輔助功能例如切換thread時可以選擇Android main thread。
implementation "io.reactivex.rxjava2:rxjava:2.1.7"
implementation "io.reactivex.rxjava2:rxandroid:2.0.1"
接著是class,概略的說,RxJava分成幾個步驟進行:
RxJava的起始需建立Observable,例如上面我們用fromAction
將setName
變成了Completable,拆開寫是長這樣:
Completable completable = Completable.fromAction(new Action() {
@Override
public void run() throws Exception {
...
}
});
completable.subscribeOn(...)
建立completable之後就可以用RxJava的各種功能了。
RxJava提供多種Observable,主要差異在於callback的類型不同,有以下這些:
Completable
適合用在執行的內容沒有回傳值,只要知道成功或失敗就好的時候,例如更新個人資料。在callback有onComplete()
和onError(Throwable e)
兩個方法,其中onComplete()
是沒有參數的表示執行完不會有回傳值。
Completable.fromAction(...)
.subscribe(new DisposableCompletableObserver() {
@Override
public void onComplete() {
}
@Override
public void onError(Throwable e) {
}
});
Maybe
適合用在不一定有回傳值時,例如資料庫查詢,如果有撈到資料會進入onSuccess(T)
,否則onComplete()
,兩者互斥只會發生其中一種。<String>
表示回傳值的型態,依照需求可以是String、Integer或自訂的物件User、Repo等等。至於onError(Throwable e)
這是每個Observable都有的,下面就省略不提了。
Maybe.fromCallable(...)
.subscribe(new DisposableMaybeObserver<String>() {
@Override
public void onSuccess(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Single
用在執行完一定要有回傳值時,例如API抓資料時一定要有Json內容回傳,成功即進入onSuccess(T)
。
Single.fromCallable(...)
.subscribe(new DisposableSingleObserver<String>() {
@Override
public void onSuccess(String s) {
}
@Override
public void onError(Throwable e) {
}
});
Observable
嗯它就叫做Observable,是RxJava一代最早出現的單位,所以在教學文章的能見度很高。適合用在多次執行的內容例如迴圈,每一次執行迴圈會進入onNext(T)
,全部完成時進入onComplete()
。
Observable.fromCallable(...)
.subscribe(new DisposableObserver<String>() {
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Flowable
跟上面的Observable一樣適合用在多次執行,callback也一模一樣,不同的是Flowable有支援backpressure,當資料發送的速度快過於處理速度時,可以讓發送速度減緩。
介紹完這些Observable的特色後要看看它們的建立方式,也許你已經注意到上面Completable是用fromAction
而其它是用fromCallable
,差異是前者沒有回傳值而後者有,完整寫法:
// While setName return nothing, use fromAction.
Completable.fromAction(new Action() {
@Override
public void run() throws Exception {
um.setName("Jane Doe");
}
})
.subscribe(...);
// If setName has return value which type is string, use fromCallable.
Single.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
return um.setName("Jane Doe");
}
})
.subscribe(...);
各Observable可用的建立方式:
Completable.fromCallable(() -> "Ignored!");
Completable.fromAction(() -> System.out.println("Hello"));
Completable.fromRunnable(() -> System.out.println("Hello"));
Maybe.fromCallable(() -> "Hello");
Maybe.fromAction(() -> System.out.println("Hello"));
Maybe.fromRunnable(() -> System.out.println("Hello"))
Single.fromCallable(() -> "Hello");
Flowable.fromCallable(() -> "Hello");
Observable.fromCallable(() -> "Hello");
Completable也可以用fromCallable
但回傳結果會被忽略,其它的如果寫"Hello"就是有回傳值,寫println的沒有。
這樣就完成第1步建立Observable了,接著看thread,會簡單很多。
Schedulers可以視為RxJava中thread的代稱,有以下這幾種:
指定thread的方式為subscribeOn
和observeOn
,前者設置Observable要在哪個thread執行,後者是callback要發生在哪個thread。
如果一開始記不住的話,個人提供一種無腦的記法...
Completable.fromAction(...)
.subscribeOn(Schedulers.io()) // 這行以上的內容要在io thread執行
.observeOn(AndroidSchedulers.mainThread()) // 以下要在main thread執行
.subscribe(...)
這種方式的前提是subscribeOn
和observeOn
要照順序寫,且只適合用在像上面這樣簡單的內容,如果Observable串聯很長的就不一定適用了。
對Schedulers還是不太了解的話可以看這篇更詳細的說明。
以上設置完畢以後就用subscribe讓Observable啟動,可以依需求寫想要的subscribe方式。
一般可以用DisposableXXXObserver,這包含所有的callback。
Completable.fromAction(...)
.subscribeOn(...)
.observeOn(...)
.subscribe(new DisposableCompletableObserver() {
@Override
public void onComplete() {
}
@Override
public void onError(Throwable e) {
}
});
若只要在收到特定回傳值類型時才觸發callback,可以用Consumer。
Single.fromCallable(...)
.subscribeOn(...)
.observeOn(...)
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
}
});
也可以單純的執行,不用任何callback。
Completable.fromAction(...)
.subscribeOn(...)
.observeOn(...)
.subscribe();
如果想要像開始時提到的用Disposable取消作業,要用subscribeWith
,才能將Observable加進Disposable中。
Completable.fromAction(...)
.subscribeOn(...)
.observeOn(...)
.subscribeWith(new DisposableCompletableObserver() {
@Override
public void onComplete() {
}
@Override
public void onError(Throwable e) {
}
});
OK終於結束了,再複習一下:
subscribeOn
設置Observable要在哪個thread執行。observeOn
設置callback內容要在哪個thread執行。今天先有個概念,明天就會開始寫程式與Retrofit搭配使用。
Reference:
Exploring RxJava 2 for Android
Multi-Threading Like a Boss in Android With RxJava 2